package com.fwmagic.dynamic_rule.service;

import com.fwmagic.dynamic_rule.bean.CacheResult;
import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.RuleAtomicParam;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.impl.*;
import com.fwmagic.dynamic_rule.utils.RuleCalcUtils;
import com.fwmagic.dynamic_rule.utils.SystemPrintUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.state.ListState;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;

import static com.fwmagic.dynamic_rule.bean.CacheAvailableLevel.*;

/**
 * 查询路由服务(加入缓存后的版本)---核心计算入口
 * 1.先用缓存管理器去查询缓存数据
 * 2.然后再根据情况调用各类service去计算
 * 3.查询时先查缓存，计算完成后将结果写入到缓存中
 */
@Slf4j
public class QueryRouterServiceV4 {

    private CacheManagerService cacheManagerService;

    private UserProfileQueryService userProfileQueryService;

    private UserActionCountQueryService userActionCountQueryService;

    private UserActionSequenceQueryService userActionSequenceQueryService;

    private UserActionCountQueryService userActionCountQueryClickHouseService;

    private UserActionSequenceQueryService userActionSequenceQueryClickHouseService;

    private ListState<LogBean> listState;

    public QueryRouterServiceV4(ListState<LogBean> listState) throws Exception {
        //创建缓存实例
        cacheManagerService = new CacheManagerService();
        userProfileQueryService = new UserProfileQueryServiceHbaseImpl();


        this.listState = listState;
        //构造底层核心的查询服务State
        userActionCountQueryService = new UserActionCountQueryServiceStateImpl(listState);
        userActionSequenceQueryService = new UserActionSequenceQueryServiceStateImpl(listState);

        //ClickHouse
        userActionCountQueryClickHouseService = new UserActionCountQueryServiceClickHouseImpl();
        userActionSequenceQueryClickHouseService = new UserActionSequenceQueryServiceClickHouseImpl();

    }

    //控制画像条件查询路由

    /**
     * 查询画像条件
     *
     * @param logBean
     * @param ruleParam
     * @return
     */
    public boolean userProfileQuery(LogBean logBean, RuleParam ruleParam) {
        long s = System.currentTimeMillis();
        boolean userProfileMatch = userProfileQueryService.judgeProfileCondition(logBean.getDeviceId(), ruleParam);
        long e = System.currentTimeMillis();
        log.warn("规则：{},用户:{},画像查询条件：{},结果为：{},耗时：{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleParam.getUserProfileParam(), userProfileMatch, (e - s));
        return userProfileMatch;
    }

    //控制count类条件查询路由

    /**
     * 条件次数查询
     *
     * @param logBean
     * @param ruleParam
     * @return
     */
    public boolean userActionCountQuery(LogBean logBean, RuleParam ruleParam) throws Exception {
        List<RuleAtomicParam> userActionCountParam = ruleParam.getUserActionCountParam();
        if (CollectionUtils.isEmpty(userActionCountParam)) return true;

        /*
         * 查询缓存逻辑
         * 遍历规则中的各个条件，逐个条件去缓存中查询
         * 依据后续查询的结果，如果已经完全匹配的条件，则从规则中剔除
         * 如果是部分有效，则将条件的时间窗口起始点更新为缓存有效窗口的结束点
         */
        //deviceId|eventid-p1-v1-p2-v2
        updateRuleParamByCacheResult(logBean, userActionCountParam);


        //计算查询分界点：当前时间对小时取整 -1
        Long splitPoint = DateUtils.addHours(DateUtils.ceiling(new Date(), Calendar.HOUR), -2).getTime();

        //只查近期的条件组
        ArrayList<RuleAtomicParam> nearRuleAtomicParamsList = new ArrayList<>();
        //只查远期的条件组
        ArrayList<RuleAtomicParam> farRuleAtomicParamsList = new ArrayList<>();
        //查跨时间分界点的条件组
        ArrayList<RuleAtomicParam> crossoverRuleAtomicParamsList = new ArrayList<>();

        //划分条件组
        splitParamList(userActionCountParam, splitPoint, nearRuleAtomicParamsList, farRuleAtomicParamsList, crossoverRuleAtomicParamsList);
        log.info("规则：{},用户：{},count条件组划分，分界点：splitPoint:{},State(近期)组:{}，State & ClickHouse(分界)组:{},ClickHouse(远期)组:{}", ruleParam.getRuleId(), logBean.getDeviceId(), splitPoint, nearRuleAtomicParamsList.size(), crossoverRuleAtomicParamsList.size(), farRuleAtomicParamsList.size());

        if (CollectionUtils.isNotEmpty(nearRuleAtomicParamsList)) {
            ruleParam.setUserActionCountParam(nearRuleAtomicParamsList);
            boolean userActionCountMatch = userActionCountQueryService.queryActionCounts("", ruleParam);

            //将近期查询结果插入缓存
            for (RuleAtomicParam atomicParam : nearRuleAtomicParamsList) {
                String cacheKey = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), atomicParam);
                cacheManagerService.put(cacheKey, atomicParam.getRealCnt(), atomicParam.getOriginRangeStart(), trimUnboundEnd(atomicParam.getOriginRangeEnd(), logBean.getTimeStamp()));
                log.warn("(近期查询)State count查询到的数据插入缓存,key:{},value:{},条件阈值:{},cacheStart:{},cacheEnd:{}", cacheKey, atomicParam.getRealCnt(), atomicParam.getCnt(), atomicParam.getOriginRangeStart(), trimUnboundEnd(atomicParam.getOriginRangeEnd(), logBean.getTimeStamp()));
            }

            log.warn("规则：{},用户：{},State count条件组size:{},查询结果:{}", ruleParam.getRuleId(), logBean.getDeviceId(), nearRuleAtomicParamsList.size(), userActionCountMatch);
            if (!userActionCountMatch) return false;
        }

        if (CollectionUtils.isNotEmpty(crossoverRuleAtomicParamsList)) {
            for (RuleAtomicParam ruleAtomicParam : crossoverRuleAtomicParamsList) {
                long rangeStart = ruleAtomicParam.getRangeStart();
                long rangeEnd = ruleAtomicParam.getRangeEnd();
                String cacheKey = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), ruleAtomicParam);

                //先查询State
                ruleAtomicParam.setRangeStart(splitPoint);
                boolean b = userActionCountQueryService.queryActionCounts("", ruleAtomicParam);
                log.warn("规则：{},用户：{},跨界count查询-State查询:start:{},end:{},value:{},条件阈值:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleAtomicParam.getRangeStart(), ruleAtomicParam.getRangeEnd(), ruleAtomicParam.getRealCnt(), ruleAtomicParam.getCnt());

                if (b) {
                    continue;
                }

                //不满足，重置时间窗口，再查询ClickHouse
                ruleAtomicParam.setRangeStart(rangeStart);
                ruleAtomicParam.setRangeEnd(splitPoint);
                log.warn("规则：{},用户：{},跨界count查询-State查询未全部满足，需到ClickHouse中继续查！",ruleParam.getRuleId(), logBean.getDeviceId());
                boolean b1 = userActionCountQueryClickHouseService.queryActionCounts(logBean.getDeviceId(), ruleAtomicParam);
                log.warn("规则：{},用户：{},跨界count查询-ClickHouse查询:start:{},end:{},value:{},条件阈值:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleAtomicParam.getRangeStart(), ruleAtomicParam.getRangeEnd(), ruleAtomicParam.getRealCnt(), ruleAtomicParam.getCnt());

                cacheManagerService.put(cacheKey, ruleAtomicParam.getRealCnt(), ruleAtomicParam.getOriginRangeStart(), trimUnboundEnd(ruleAtomicParam.getOriginRangeEnd(), logBean.getTimeStamp()));
                log.warn("规则：{},用户：{},跨界count查询-ClickHouse查询到的数据插入缓存,key:{},value:{},cacheStart:{},cacheEnd:{}",
                        ruleParam.getRuleId(), logBean.getDeviceId(), cacheKey, ruleAtomicParam.getRealCnt(), ruleAtomicParam.getOriginRangeStart(), trimUnboundEnd(ruleAtomicParam.getOriginRangeEnd(), logBean.getTimeStamp()));

                if (!b1) return false;
            }
        }

        if (CollectionUtils.isNotEmpty(farRuleAtomicParamsList)) {
            ruleParam.setUserActionCountParam(farRuleAtomicParamsList);
            boolean userActionCountMatch = userActionCountQueryClickHouseService.queryActionCounts(logBean.getDeviceId(), ruleParam);
            log.warn("规则：{},用户：{},ClickHouse count 查询条件组size:{},总结果:{}", ruleParam.getRuleId(), logBean.getDeviceId(), farRuleAtomicParamsList.size(), userActionCountMatch);
            //存缓存
            for (RuleAtomicParam ruleAtomicParam : farRuleAtomicParamsList) {
                String cacheKey = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), ruleAtomicParam);
                cacheManagerService.put(cacheKey, ruleAtomicParam.getRealCnt(), ruleAtomicParam.getOriginRangeStart(), trimUnboundEnd(ruleAtomicParam.getOriginRangeEnd(), logBean.getTimeStamp()));
                log.warn("规则：{},用户：{},ClickHouse count 查询到的数据插入缓存,key:{},查到的value:{},条件阈值:{},cacheStart:{},cacheEnd:{}", ruleParam.getRuleId(), logBean.getDeviceId(), cacheKey, ruleAtomicParam.getRealCnt(), ruleAtomicParam.getCnt(), ruleAtomicParam.getOriginRangeStart(), trimUnboundEnd(ruleAtomicParam.getOriginRangeEnd(), logBean.getTimeStamp()));
            }
            if (!userActionCountMatch) return false;
        }

        return true;
    }

    private void splitParamList(List<RuleAtomicParam> userActionCountParam, Long splitPoint, ArrayList<RuleAtomicParam> nearRuleAtomicParams, ArrayList<RuleAtomicParam> farRuleAtomicParams, ArrayList<RuleAtomicParam> crossoverRuleAtomicParams) {
        for (RuleAtomicParam param : userActionCountParam) {
            if (param.getRangeEnd() < splitPoint) {
                farRuleAtomicParams.add(param);
            } else if (param.getRangeStart() >= splitPoint) {
                nearRuleAtomicParams.add(param);
            } else {
                crossoverRuleAtomicParams.add(param);
            }
        }
    }

    /**
     * 查询缓存并根据缓存结果更新条件规则
     * 1.缩短条件时间窗口
     * 2.剔除条件
     * 3.不做任何事情
     *
     * @param logBean
     * @param userActionCountParam
     */
    private void updateRuleParamByCacheResult(LogBean logBean, List<RuleAtomicParam> userActionCountParam) {
        int totalSize = userActionCountParam.size();
        for (int i = 0; i < userActionCountParam.size(); i++) {
            //获取条件
            RuleAtomicParam ruleAtomicParam = userActionCountParam.get(i);
            long originStart = ruleAtomicParam.getRangeStart();
            long originEnd = ruleAtomicParam.getRangeEnd();
            //拼接key
            String cacheKey = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), ruleAtomicParam);
            //获取缓存数据
            CacheResult cacheResult = cacheManagerService.get(cacheKey, ruleAtomicParam);
            switch (cacheResult.getAvailableLevel()) {
                case PARTIAL_AVL:
                    //1.更新条件的时间窗口起始点
                    //2.将缓存的value值，放入参数对象的realCnt中
                    ruleAtomicParam.setRangeStart(cacheResult.getTimeEnd() == Long.MAX_VALUE ? logBean.getTimeStamp() + 1 : cacheResult.getTimeEnd());
                    ruleAtomicParam.setRealCnt(cacheResult.getValue());
                    break;
                case WHOLE_AVL:
                    //如果完全有效，剔除该条件
                    userActionCountParam.remove(i);
                    i--;//重置角标，防止两次相同事件相邻时只删除第一个
                    break;
                case UN_AVL:
                    break;
            }

            log.info("用户:{},count缓存avl:{},key:{},value:{},条件阈值：{},缓存start:{},缓存end:{},条件初始start:{},条件初始end:{},条件size:{}",
                    logBean.getDeviceId(), cacheResult.getAvailableLevel(), cacheKey, cacheResult.getValue(), ruleAtomicParam.getCnt(), cacheResult.getTimeStart(), cacheResult.getTimeEnd(),
                    originStart, originEnd, totalSize);
        }
    }

    /**
     * 控制sequence类条件查询路由
     *
     * @param logBean
     * @param ruleParam
     * @return
     * @throws Exception
     */
    public boolean userActionSequenceQuery(LogBean logBean, RuleParam ruleParam) throws Exception {
        //取出规则中的序列条件
        List<RuleAtomicParam> userActionSequenceParam = ruleParam.getUserActionSequenceParam();
        if (CollectionUtils.isEmpty(userActionSequenceParam)) return true;

        // 取出序列条件相关参数
        long originStart = userActionSequenceParam.get(0).getRangeStart();
        long originEnd = userActionSequenceParam.get(0).getRangeEnd();
        //规则中的最大步骤
        int totalStep = userActionSequenceParam.size();

        /*
         * 加入缓存逻辑
         * 比如，原始条件是：[A,B,C,D](t1,t10)
         *      在缓存中查到的是：[A,B](t1,t5)
         *      那么，我们就要把后续的处理条件修改为：[C,D](t6,t10)
         */
        String cacheKey = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), userActionSequenceParam);
        CacheResult cacheResult = cacheManagerService.get(cacheKey, originStart, originEnd, totalStep);
        switch (cacheResult.getAvailableLevel()) {
            case PARTIAL_AVL:
                //1.截断条件序列
                List<RuleAtomicParam> newSequenceList = userActionSequenceParam.subList(cacheResult.getValue(), totalStep);
                ruleParam.setUserActionSequenceParam(newSequenceList);
                //2.更新条件的时间窗口起始点
//                newSequenceList.forEach(param -> param.setRangeStart(cacheResult.getTimeEnd()));
                newSequenceList.get(0).setRangeStart(cacheResult.getTimeEnd());
                //3.将缓存的value值，放入参数对象的MaxStep中
                ruleParam.setUserActionSequenceQueriedMaxStep(cacheResult.getValue());
                log.warn("规则：{},用户：{},sequence查询缓存完毕，avl状态:{},value:{}，startTime:{},endTime:{},结果step:{},条件step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), cacheResult.getAvailableLevel(), cacheResult.getValue(), cacheResult.getTimeStart(), cacheResult.getTimeEnd(), ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);

                break;
            case UN_AVL:
                log.warn("规则：{},用户：{},sequence查询缓存完毕，avl状态:{},value:{}，startTime:{},endTime:{},结果step:{},条件step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), cacheResult.getAvailableLevel(), cacheResult.getValue(), cacheResult.getTimeStart(), cacheResult.getTimeEnd(), ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);

                break;
            case WHOLE_AVL:
                //如果完全有效，返回
                log.warn("规则：{},用户：{},sequence查询缓存完毕，avl状态:{},value:{}，startTime:{},endTime:{},结果step:{},条件step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), cacheResult.getAvailableLevel(), cacheResult.getValue(), cacheResult.getTimeStart(), cacheResult.getTimeEnd(), ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);

                return true;
        }

        //计算查询分界点：当前时间对小时向下取整 -1 或者向上取整-2
        Long splitPoint = DateUtils.addHours(DateUtils.ceiling(new Date(), Calendar.HOUR), -2).getTime();

        RuleAtomicParam param = ruleParam.getUserActionSequenceParam().get(0);
        long rangeStart = param.getRangeStart();//MaxStep是一直累计增加的，所以记录有效值的时效都是这两个时间
        long rangeEnd = param.getRangeEnd();

        //只查近期:State
        if (rangeStart >= splitPoint) {
            //state中查询
            boolean b = userActionSequenceQueryService.queryActionSequence("", ruleParam);
            log.warn("规则：{},用户：{},sequence近期查询State完毕，start:{},splitPoint:{},end:{},结果step:{},条件step:{}",
                    ruleParam.getRuleId(), logBean.getDeviceId(), rangeStart, splitPoint, rangeEnd, ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);

            //存缓存
            String key = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), ruleParam.getUserActionSequenceParam());
            cacheManagerService.put(key, ruleParam.getUserActionSequenceQueriedMaxStep(), originStart, trimUnboundEnd(originEnd, logBean.getTimeStamp()));
            return b;
            //只查远期：ClickHouse
        } else if (rangeStart < splitPoint && rangeEnd > splitPoint) {
            //分步骤查询
            //1.碰运气，先查State，看是否已满足次序条件
            modifyRangeTime(userActionSequenceParam, splitPoint, rangeEnd);
            int existMaxStep = ruleParam.getUserActionSequenceQueriedMaxStep();
            userActionSequenceQueryService.queryActionSequence("", ruleParam);
            log.warn("规则：{},用户：{},sequence跨界-State查询碰运气，结果step:{},条件step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);
            if (ruleParam.getUserActionSequenceQueriedMaxStep() >= totalStep) {
                log.warn("规则：{},用户：{},sequence跨界-State查询碰运气成功，结果step:{},条件step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);
                List<RuleAtomicParam> params = ruleParam.getUserActionSequenceParam();
                String cacheKey1 = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), params);
                cacheManagerService.put(cacheKey1, ruleParam.getUserActionSequenceQueriedMaxStep(), originStart, trimUnboundEnd(originEnd, logBean.getTimeStamp()));
                return true;
            } else {
                ruleParam.setUserActionSequenceQueriedMaxStep(existMaxStep);
                log.warn("规则：{},用户：{},sequence跨界-State查询碰运气失败，恢复step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleParam.getUserActionSequenceQueriedMaxStep());
            }

            //2.如果1中的不满足，则按照先查远期的ClickHouse再查State的逻辑查询计算
            modifyRangeTime(userActionSequenceParam, rangeStart, splitPoint);
            userActionSequenceQueryClickHouseService.queryActionSequence(logBean.getDeviceId(), ruleParam);
            log.warn("规则：{},用户：{},sequence跨界-ClickHouse查询，结果step:{},条件step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);
            if (ruleParam.getUserActionSequenceQueriedMaxStep() >= totalStep) {
                //存缓存
                List<RuleAtomicParam> list = ruleParam.getUserActionSequenceParam();
                String cacheKey1 = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), list);
                cacheManagerService.put(cacheKey1, ruleParam.getUserActionSequenceQueriedMaxStep(), originStart, splitPoint);
                return true;
            }
            //远期步骤不匹配，记住步骤数，查近期数据
            int farMaxStep = ruleParam.getUserActionSequenceQueriedMaxStep();

            //查近期数据
            modifyRangeTime(userActionSequenceParam, splitPoint, rangeEnd);
            //截段次序数据，从上一步不匹配的位置开始查
            List<RuleAtomicParam> subList = userActionSequenceParam.subList(farMaxStep, userActionSequenceParam.size());
            ruleParam.setUserActionSequenceParam(subList);
            userActionSequenceQueryService.queryActionSequence("", ruleParam);
            log.warn("规则：{},用户：{},sequence跨界-ClickHouse查询，结果step:{},截断条件step:{},阈值step:{}", ruleParam.getRuleId(), logBean.getDeviceId(), ruleParam.getUserActionSequenceQueriedMaxStep(), subList.size(), totalStep);

            //存缓存
            String cacheKey1 = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), userActionSequenceParam);
            cacheManagerService.put(cacheKey1, ruleParam.getUserActionSequenceQueriedMaxStep(), originStart, trimUnboundEnd(originEnd, logBean.getTimeStamp()));
            return ruleParam.getUserActionSequenceQueriedMaxStep() >= totalStep;
        } else {//rangeEnd < splitPoint
            //ClickHouse中查询
            boolean b = userActionSequenceQueryClickHouseService.queryActionSequence(logBean.getDeviceId(), ruleParam);
            log.warn("规则：{},用户：{},sequence远期查询ClickHouse完毕，start:{},splitPoint:{},end:{},结果step:{},条件step:{}",
                    ruleParam.getRuleId(), logBean.getDeviceId(), rangeStart, splitPoint, rangeEnd, ruleParam.getUserActionSequenceQueriedMaxStep(), totalStep);

            List<RuleAtomicParam> list = ruleParam.getUserActionSequenceParam();
            String cacheKey1 = RuleCalcUtils.getCacheKey(logBean.getDeviceId(), list);
            cacheManagerService.put(cacheKey1, ruleParam.getUserActionSequenceQueriedMaxStep(), list.get(0).getRangeStart(), trimUnboundEnd(list.get(0).getRangeEnd(), logBean.getTimeStamp()));
            return b;
        }
    }

    private void modifyRangeTime(List<RuleAtomicParam> userActionSequenceParam, long rangeStart, Long rangEnd) {
        for (RuleAtomicParam ruleAtomicParam : userActionSequenceParam) {
            ruleAtomicParam.setRangeStart(rangeStart);
            ruleAtomicParam.setRangeEnd(rangEnd);
        }
    }

    /**
     * 超界时间范围处理
     *
     * @param mayBeUnbound
     * @param trimed
     * @return
     */
    private long trimUnboundEnd(long mayBeUnbound, long trimed) {
//        return mayBeUnbound == Long.MAX_VALUE ? trimed : mayBeUnbound;
        return mayBeUnbound >= trimed ? trimed : mayBeUnbound;
    }
}
